Data Ingestion using Amazon Kinesis Data Streams with Snowflake Data Lake
In this topic we describe the creation of a data ingestion pipeline using Amazon Kinesis Data Streams as a data source with Databricks for data integration and ingesting the data into a Snowflake data lake.
Prerequisites
-
Access to a configured Snowflake account, which will be used as a data lake in the pipeline.
-
A configured instance of Amazon Kinesis Data Streams. For information about configuring Kinesis, refer to the following topic: Configuring Amazon Kinesis Data Streams
Creating a data ingestion pipeline
- On the home page of Data Pipeline Studio, add the following stages and connect them as shown below:
- Data Source: Amazon Kinesis Data Streams
- Data Integration: Databricks
- Data Lake: Snowflake
-
Configure the Kinesis node and Snowflake node.
-
Click the Databricks node and click Create Job.
-
Complete the following steps to create a data integration job:
Job NameProvide job details for the data integration job:
-
Template - Based on the source and destination that you choose in the data pipeline, the template is automatically selected.
-
Job Name - Provide a name for the data integration job.
-
Node rerun Attempts - Specify the number of times the job is rerun in case of failure. The default setting is done at the pipeline level.
Source-
Source - This is automatically selected depending on the source added in the pipeline.
-
Datastore - The datastore that you configured for the Kinesis node is auto-populated.
-
Stream Start Position - The position in the data stream from which to start streaming. You can choose from the following options:
-
latest - start streaming the most recent record in the shard, ensuring that you always read the most recent data.
-
trim_horizon - start streaming from the last untrimmed record in the shard.
-
Target-
Target - This is automatically selected depending on the target added in the pipeline.
-
Datastore - The datastore that you configured for the Snowflake node is auto-populated.
-
Warehouse - The warehouse associated with the Snowflake account that is configured.
-
Database - The database associated with the Snowflake account that is configured.
-
Schema - The schema associated with the database.
Column MappingIn this step you map the columns from the target data to custom columns. This step helps you to edit the columns in the target. You can deselect the columns that you do not require and rename the columns are per your requirement.
Filter columns - From the list of columns that is populated, you select or deselect columns according to your use case and provide custom names to certain columns while mapping the columns. You can use the search option to find specific columns.
-
Default Column Name - You can view the default names of the columns.
-
Custom Column Name - You can map the columns from the target table to the default column names.
Data MappingIn this step you map the source data to the target. In this example, since the target is Snowflake, you map the data stream to a Snowflake table.
Map source data to target tables - Select a table on the target to which the source data must be mapped. You can either select an existing table or create a new one. To create a new table provide the table name and click to create.
Select an operation to perform on the data, from the following options:
-
Append - Adds new data at the end of the table without deleting the existing content.
-
Overwrite - Replaces the entire content of a table with new data.
Cluster ConfigurationYou can select an all-purpose cluster or a job cluster to run the configured job. In case your Databricks cluster is not created through the Lazsa Platform and you want to update custom environment variables, refer to the following:
All Purpose ClustersCluster - Select the all-purpose cluster that you want to use for the data integration job, from the dropdown list.
Job ClusterCluster Details Description Choose Cluster Provide a name for the job cluster that you want to create. Job Configuration Name Provide a name for the job cluster configuration. Databricks Runtime Version Select the appropriate Databricks version. Worker Type Select the worker type for the job cluster. Workers Enter the number of workers to be used for running the job in the job cluster.
You can either have a fixed number of workers or you can choose autoscaling.
Enable Autoscaling Autoscaling helps in scaling up or down the number of workers within the range specified by you. This helps in reallocating workers to a job during its compute-intensive phase. Once the compute requirement reduces the excess number of workers are removed. This helps control your resource costs. Cloud Infrastructure Details First on Demand Provide the number of cluster nodes that are marked as first_on_demand.
The first_on_demand nodes of the cluster are placed on on-demand instances.
Availability Choose the type of EC2 instances to launch your Apache Spark clusters, from the following options:
-
Spot
-
On-demand
-
Spot with fallback
Zone Identifier of the availability zone or data center in which the cluster resides.
The provided availability zone must be in the same region as the Databricks deployment.
Instance Profile ARN Provide an instance profile ARN that can access the target Snowflake instance. EBS Volume Type The type of EBS volume that is launched with this cluster. EBS Volume Count The number of volumes launched for each instance of the cluster. EBS Volume Size The size of the EBS volume to be used for the cluster. Additional Details Spark Config To fine tune Spark jobs, provide custom Spark configuration properties in key value pairs. Environment Variables Configure custom environment variables that you can use in init scripts. Logging Path (DBFS Only) Provide the logging path to deliver the logs for the Spark jobs. Init Scripts Provide the init or initialization scripts that run during the start up of each cluster. NotificationsYou can configure the SQS and SNS services to send notifications related to the node in this job. This provides information about various events related to the node without actually connecting to the Lazsa Platform.
SQS and SNS Configurations - Select an SQS or SNS configuration that is integrated with the Lazsa Platform. Events - Enable the events for which you want to enable notifications:
-
Select All
-
Node Execution Failed
-
Node Execution Succeeded
-
Node Execution Running
-
Node Execution Rejected
Event Details - Select the details of the events that you want to include in the notifications from the dropdown list. Additional Parameters - Provide any additional parameters that are to be added in the SQS and SNS notifications. A sample JSON is provided, you can use this to write logic for processing the events. -
Running a data ingestion pipeline
After you have created the data integration job with Amazon Kinesis Data Streams, you can run the pipeline in the following way:
-
After the job creation is complete, ensure that you publish the pipeline. If you haven't already done so, click Publish.
-
Click . The Data Streams window opens, which provides a list of the data streams in the pipeline. Enable the toggle for the stream that you want to use to fetch data.
You can see that the data stream that you enabled is now running. Click the refresh icon to view the latest information about number of events processed.
Troubleshooting a failed data integration job
When you click the Databricks node in the pipeline, you know if your data integration job has failed looking at the status of the job.
-
Click the Databricks node in the pipeline.
-
Check the status of the Databricks integration job. The status could be one of the following:
-
Running
-
Canceled
-
Pending
-
Failed
-
-
If the job status is seen as Failed, click the (...) ellipsis and then click Open Databricks Dashboard.
-
You are navigated to the specific Databricks job. This shows the list of job runs. Click the job run for which you want to view the details.
-
View the details and check for errors.
What's next? Data Ingestion using Amazon Kinesis Data Streams |